package org.codehaus.activemq.broker.impl;

import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.Broker;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.capacity.DelegateCapacityMonitor;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQXid;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
import org.codehaus.activemq.service.MessageContainerManager;
import org.codehaus.activemq.service.Transaction;
import org.codehaus.activemq.service.TransactionManager;
import org.codehaus.activemq.service.boundedvm.TransientTopicBoundedMessageManager;
import org.codehaus.activemq.service.impl.DurableTopicMessageContainerManager;
import org.codehaus.activemq.service.impl.MessageAckTransactionTask;
import org.codehaus.activemq.service.impl.QueueMessageContainerManager;
import org.codehaus.activemq.service.impl.RedeliverMessageTransactionTask;
import org.codehaus.activemq.service.impl.SendMessageTransactionTask;
import org.codehaus.activemq.store.PersistenceAdapter;
import org.codehaus.activemq.store.PreparedTransactionStore;
import org.codehaus.activemq.store.vm.VMPersistenceAdapter;
import org.codehaus.activemq.store.vm.VMTransactionManager;
import org.codehaus.activemq.util.Callback;
import org.codehaus.activemq.util.ExceptionTemplate;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class DefaultBroker extends DelegateCapacityMonitor implements Broker {
	private static final Log log = LogFactory.getLog(DefaultBroker.class);
	protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir";
	protected static final String PERSISTENCE_ADAPTER_PROPERTY = "activemq.persistenceAdapter";
	protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = { File.class };
	private static final long DEFAULT_MAX_MEMORY_USAGE = 20971520L;
	private PersistenceAdapter persistenceAdapter;
	private TransactionManager transactionManager;
	private MessageContainerManager[] containerManagers;
	private File tempDir;
	private MemoryBoundedQueueManager memoryManager;
	private PreparedTransactionStore preparedTransactionStore;
	private final String brokerName;

	public DefaultBroker(String brokerName) {
		this.brokerName = brokerName;
		this.memoryManager = new MemoryBoundedQueueManager("Broker Memory Manager", 20971520L);
		setDelegate(this.memoryManager);
	}

	public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) {
		this(brokerName);
		this.persistenceAdapter = persistenceAdapter;
	}

	public void start() throws JMSException {
		if (this.persistenceAdapter == null) {
			this.persistenceAdapter = createPersistenceAdapter();
		}
		this.persistenceAdapter.start();

		if (this.transactionManager == null) {
			this.preparedTransactionStore = this.persistenceAdapter.createPreparedTransactionStore();
			this.transactionManager = new VMTransactionManager(this, this.preparedTransactionStore);
		}
		this.transactionManager.start();

		getContainerManagers();

		for (int i = 0; i < this.containerManagers.length; i++)
			this.containerManagers[i].start();
	}

	public void stop() throws JMSException {
		ExceptionTemplate template = new ExceptionTemplate();

		for (int i = 0; i < this.containerManagers.length; i++) {
			MessageContainerManager containerManager = this.containerManagers[i];
			template.run(new Callback() {
				private final MessageContainerManager val$containerManager = containerManager;

				public void execute() throws Throwable {
					this.val$containerManager.stop();
				}
			});
		}
		if (this.transactionManager != null) {
			template.run(new Callback() {
				public void execute() throws Throwable {
					DefaultBroker.this.transactionManager.stop();
				}
			});
		}
		template.run(new Callback() {
			public void execute() throws Throwable {
				DefaultBroker.this.persistenceAdapter.stop();
			}
		});
		template.throwJMSException();
	}

	public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
		for (int i = 0; i < this.containerManagers.length; i++)
			this.containerManagers[i].acknowledgeMessage(client, ack);
	}

	public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
			throws JMSException {
		Transaction transaction;
		if (ack.isXaTransacted()) {
			try {
				transaction = this.transactionManager.getXATransaction(new ActiveMQXid(transactionId));
			} catch (XAException e) {
				// Transaction transaction;
				throw ((JMSException) new JMSException(e.getMessage()).initCause(e));
			}
		} else {
			transaction = this.transactionManager.getLocalTransaction(transactionId);
		}
		transaction.addPostCommitTask(new MessageAckTransactionTask(client, ack));
		transaction.addPostRollbackTask(new RedeliverMessageTransactionTask(client, ack));

		for (int i = 0; i < this.containerManagers.length; i++)
			this.containerManagers[i].acknowledgeTransactedMessage(client, transactionId, ack);
	}

	public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
		checkValid();
		if (message.getJMSMessageID() == null) {
			throw new JMSException("No messageID specified for the Message");
		}
		for (int i = 0; i < this.containerManagers.length; i++)
			this.containerManagers[i].sendMessage(client, message);
	}

	public void sendTransactedMessage(BrokerClient client, String transactionId, ActiveMQMessage message)
			throws JMSException {
		Transaction transaction;
		if (message.isXaTransacted()) {
			try {
				transaction = this.transactionManager.getXATransaction(new ActiveMQXid(transactionId));
			} catch (XAException e) {
				// Transaction transaction;
				throw ((JMSException) new JMSException(e.getMessage()).initCause(e));
			}
		} else {
			transaction = this.transactionManager.getLocalTransaction(transactionId);
		}

		transaction.addPostCommitTask(new SendMessageTransactionTask(client, message));
	}

	public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
		validateConsumer(info);
		MessageContainerManager[] array = getContainerManagers();
		for (int i = 0; i < array.length; i++)
			array[i].addMessageConsumer(client, info);
	}

	public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
		validateConsumer(info);
		for (int i = 0; i < this.containerManagers.length; i++)
			this.containerManagers[i].removeMessageConsumer(client, info);
	}

	public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
		for (int i = 0; i < this.containerManagers.length; i++)
			this.containerManagers[i].redeliverMessage(client, ack);
	}

	public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
		for (int i = 0; i < this.containerManagers.length; i++)
			this.containerManagers[i].deleteSubscription(clientId, subscriberName);
	}

	public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
		this.transactionManager.createLocalTransaction(client, transactionId);
	}

	public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
		try {
			for (int i = 0; i < this.containerManagers.length; i++) {
				this.containerManagers[i].commitTransaction(client, transactionId);
			}
			Transaction transaction = this.transactionManager.getLocalTransaction(transactionId);
			transaction.commit(true);
		} catch (XAException e) {
			throw ((JMSException) new JMSException(e.getMessage()).initCause(e));
		}
	}

	public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
		try {
			for (int i = 0; i < this.containerManagers.length; i++) {
				this.containerManagers[i].rollbackTransaction(client, transactionId);
			}
			Transaction transaction = this.transactionManager.getLocalTransaction(transactionId);
			transaction.rollback();
		} catch (XAException e) {
			throw ((JMSException) new JMSException(e.getMessage()).initCause(e));
		}
	}

	public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
		this.transactionManager.createXATransaction(client, xid);
	}

	public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
		Transaction transaction = this.transactionManager.getXATransaction(xid);
		return transaction.prepare();
	}

	public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
		Transaction transaction = this.transactionManager.getXATransaction(xid);
		transaction.rollback();
	}

	public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
		Transaction transaction = this.transactionManager.getXATransaction(xid);
		transaction.commit(onePhase);
	}

	public void cleanUpClient(BrokerClient client) throws JMSException {
		if (this.transactionManager != null)
			this.transactionManager.cleanUpClient(client);
	}

	public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
		return this.transactionManager.getPreparedXATransactions();
	}

	public File getTempDir() {
		if (this.tempDir == null) {
			String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp");
			this.tempDir = new File(dirName);
		}
		return this.tempDir;
	}

	public String getBrokerName() {
		return this.brokerName;
	}

	public void setTempDir(File tempDir) {
		this.tempDir = tempDir;
	}

	public MessageContainerManager[] getContainerManagers() {
		if (this.containerManagers == null) {
			this.containerManagers = createContainerManagers();
		}
		return this.containerManagers;
	}

	public void setContainerManagers(MessageContainerManager[] containerManagers) {
		this.containerManagers = containerManagers;
	}

	public PersistenceAdapter getPersistenceAdapter() {
		return this.persistenceAdapter;
	}

	public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
		this.persistenceAdapter = persistenceAdapter;
	}

	public TransactionManager getTransactionManager() {
		return this.transactionManager;
	}

	public void setTransactionManager(TransactionManager transactionManager) {
		this.transactionManager = transactionManager;
	}

	public PreparedTransactionStore getPreparedTransactionStore() {
		return this.preparedTransactionStore;
	}

	public void setPreparedTransactionStore(PreparedTransactionStore preparedTransactionStore) {
		this.preparedTransactionStore = preparedTransactionStore;
	}

	public long getMaximumMemoryUsage() {
		return this.memoryManager.getValueLimit();
	}

	public void setMaximumMemoryUsage(long maximumMemoryUsage) {
		this.memoryManager.setValueLimit(maximumMemoryUsage);
	}

	protected PersistenceAdapter createPersistenceAdapter() throws JMSException {
		File directory = new File(getStoreDirectory());

		PersistenceAdapter answer = null;
		String property = System.getProperty("activemq.persistenceAdapter");
		if (property != null) {
			answer = tryCreatePersistenceAdapter(property, directory, false);
		}
		if (answer == null) {
			answer = tryCreatePersistenceAdapter("org.codehaus.activemq.store.jdbm.JdbmPersistenceAdapter", directory,
					true);
		}
		if (answer == null) {
			answer = tryCreatePersistenceAdapter("org.codehaus.activemq.store.bdb.BDbPersistenceAdapter", directory,
					true);
		}
		if (answer != null) {
			return answer;
		}

		log.warn(
				"Neither JDBM or BDB on the classpath or property 'activemq.persistenceAdapter' not specified so defaulting to use RAM based message persistence");

		return new VMPersistenceAdapter();
	}

	protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors)
			throws JMSException {
		Class adapterClass = loadClass(className, ignoreErrors);
		if (adapterClass != null) {
			try {
				Method method = adapterClass.getMethod("newInstance", NEWINSTANCE_PARAMETER_TYPES);
				PersistenceAdapter answer = (PersistenceAdapter) method.invoke(null, new Object[] { directory });
				log.info("Using persistence adapter: " + adapterClass.getName());
				return answer;
			} catch (InvocationTargetException e) {
				Throwable cause = e.getTargetException();
				if (cause != null) {
					if ((cause instanceof JMSException)) {
						throw ((JMSException) cause);
					}

					if ((cause instanceof Exception)) {
						throw createInstantiateAdapterException(adapterClass, (Exception) cause);
					}
				}

				if (!ignoreErrors)
					throw createInstantiateAdapterException(adapterClass, e);
			} catch (Throwable e) {
				if (!ignoreErrors) {
					throw createInstantiateAdapterException(adapterClass, e);
				}
			}
		}
		return null;
	}

	protected JMSException createInstantiateAdapterException(Class adapterClass, Throwable e) {
		return JMSExceptionHelper
				.newJMSException("Could not instantiate instance of " + adapterClass.getName() + ". Reason: " + e, e);
	}

	protected Class loadClass(String name, boolean ignoreErrors) throws JMSException {
		try {
			return Thread.currentThread().getContextClassLoader().loadClass(name);
		} catch (ClassNotFoundException e) {
			try {
				return getClass().getClassLoader().loadClass(name);
			} catch (ClassNotFoundException e2) {
				if (ignoreErrors) {
					log.trace("Could not find class: " + name + " on the classpath");
					return null;
				}
				throw JMSExceptionHelper
						.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e);
			}
		}

	}

	protected String getStoreDirectory() {
		return System.getProperty("activemq.store.dir", "ActiveMQ");
	}

	protected MessageContainerManager[] createContainerManagers() {
		MessageContainerManager[] answer = { new TransientTopicBoundedMessageManager(this.memoryManager),
				new DurableTopicMessageContainerManager(this.persistenceAdapter),
				new QueueMessageContainerManager(this.persistenceAdapter) };

		return answer;
	}

	protected void validateConsumer(ConsumerInfo info) throws JMSException {
		if (info.getConsumerId() == null)
			throw new JMSException("No consumerId specified for the ConsumerInfo");
	}

	protected void checkValid() throws JMSException {
		if (this.containerManagers == null)
			throw new JMSException(
					"This Broker has not yet been started. Ensure start() is called before invoking action methods");
	}
}